package com.googlecode.jobop.support;

import static java.lang.String.format;
import static org.apache.zookeeper.CreateMode.EPHEMERAL;

import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.googlecode.jobop.domain.Provider;
import com.googlecode.jobop.notify.Notify;
import com.googlecode.jobop.notify.NotifyReceiver;
import com.googlecode.jobop.notify.ProviderRegistNotify;
import com.googlecode.jobop.utils.IpUtil;
import com.googlecode.jobop.utils.JobopNotifyUtil;
import com.googlecode.jobop.zk.factory.Zkfactory;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.api.BackgroundCallback;
import com.netflix.curator.framework.api.CuratorEvent;
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.framework.state.ConnectionStateListener;

/**
 * jobop服务提供者註冊支撐
 * 
 * @author van
 * 
 */
public class JobopProviderRegistrySupport implements JobopSupport, NotifyReceiver {
	private static Logger logger = LoggerFactory.getLogger(JobopProviderRegistrySupport.class);
	/**
	 * 启动标识，用来记录开始时间和启动唯一性
	 */
	private static AtomicLong startAndtimeFlag = new AtomicLong(0);
	/**
	 * 需要注册的服务提供者
	 */
	private static Map<String, Provider> providers = new ConcurrentHashMap<String, Provider>();

	/**
	 * zkClient
	 */
	private static CuratorFramework client = null;

	@Override
	public void init() {
		// 此代码只执行一次
		if (startAndtimeFlag.compareAndSet(0, System.currentTimeMillis())) {

			// 注册成为Jobop通知监听器，监听注册行为,未免重复通知，保证只有一个监听器注册
			JobopNotifyUtil.regisetReceiver(this);

			client = Zkfactory.newInstance().getClient();

			// 注册成为zk监听器，监听zk连接情况,保证只有一个监听器注册
			client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
				/**
				 * 监听连接事件和重连事件(连接事件过来后，要怎么知道服务已经全部可以注册？？)
				 */
				@Override
				public void stateChanged(CuratorFramework client, ConnectionState state) {
					if (state == ConnectionState.RECONNECTED) {
						for (Entry<String, Provider> entry : providers.entrySet())
							JobopNotifyUtil.postNotify(new ProviderRegistNotify(entry.getValue()));
					}
				}
			});

		}

	}

	@Override
	public void destory() {
		// 不为0的时候才关闭，并且设置为0,只执行一次
		if (!startAndtimeFlag.compareAndSet(0, 0)) {
			client.close();
		}
	}

	@Override
	public boolean canReceive(Notify<?> notify) {
		boolean canProcess = false;
		if (notify instanceof ProviderRegistNotify) {
			if (notify.getContent() != null) {
				canProcess = true;
			}
		}
		return canProcess;
	}

	@Override
	public void doReceive(Notify<?> notify) {
		Provider provider = (Provider) notify.getContent();
		// 去注册这个服务提供者
		registProvider(provider);
	}

	/**
	 * 注册服务提供者
	 * 
	 * @param provider
	 */
	public void registProvider(final Provider provider) {
		final String key = provider.getKey();
		if (providers.containsKey(key)) {
			// TODO:日志
			System.out.println(providers);

		}
		final String path = format("/jobop/providers/%s/%s", provider.getAppGroup(), provider.getMethodSign());
		// 没有路径就创建
		try {
			if (client.checkExists().forPath(path) == null) {
				client.create().creatingParentsIfNeeded().forPath(path);
			}
		} catch (Exception e) {

		}
		try {
			// 注册临时节点，随客户端断开就会销毁
			client.create().withMode(EPHEMERAL).inBackground(new BackgroundCallback() {
				@Override
				public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
					providers.put(key, provider);
				}

			}).forPath(format("%s/%s", path, IpUtil.getIp()));
		} catch (Exception e) {
			logger.error("注册服务提供者失败", e);
		}
	}
}
